package com.z.tableapi

import org.apache.flink.table.api._

/**
 * @Author wenz.ma
 * @Date 2021/10/27 17:52
 * @Desc cdc 实时同步mysql表数据
 */
object Mysql2MysqlWithCDC {
  def main(args: Array[String]): Unit = {
    //创建环境
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = TableEnvironment.create(settings)
    //创建表user01 - 使用 mysql-cdc connector
    tEnv.executeSql(
      """
        |create table user01 (
        |id int ,
        |name string,
        |PRIMARY KEY  (id) NOT ENFORCED
        |)with(
        |'connector' = 'mysql-cdc',
        |'hostname' = 'server120',
        |'port' = '3306',
        |'username' = 'flink_test',
        |'password' = 'flink_test',
        |'database-name' = 'flink_test',
        |'table-name' = 'user01',
        |'scan.incremental.snapshot.enabled' = 'false'
        |)
        |""".stripMargin)
    //创建表user02 - 使用jdbc connector
    tEnv.executeSql(
      """
        |create table user02 (
        |id int PRIMARY KEY,
        |name string
        |)with(
         'connector' = 'jdbc',
        | 'url' = 'jdbc:mysql://server120:3306/flink_test',
        | 'table-name' = 'user02',
        | 'username' = 'flink_test',
        | 'password' = 'flink_test'
        |)
        |""".stripMargin)

    //将user01同步到user02
    tEnv.from("user01").executeInsert("user02")
  }
}
